# copyright (c) 2020 paddlepaddle authors. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import tempfile
import unittest

import numpy as np

import paddle
from paddle import Model, fluid, jit, to_tensor
from paddle.hapi.model import prepare_distributed_context
from paddle.io import Dataset, DistributedBatchSampler
from paddle.metric import Accuracy
from paddle.nn import Conv2D, Linear, ReLU, Sequential
from paddle.nn.layer.loss import CrossEntropyLoss
from paddle.static import InputSpec
from paddle.vision import models
from paddle.vision.datasets import MNIST
from paddle.vision.models import LeNet


class LeNetDygraph(paddle.nn.Layer):
    def __init__(self, num_classes=10):
        super().__init__()
        self.num_classes = num_classes
        self.features = Sequential(
            Conv2D(1, 6, 3, stride=1, padding=1),
            ReLU(),
            paddle.nn.MaxPool2D(2, 2),
            Conv2D(6, 16, 5, stride=1, padding=0),
            ReLU(),
            paddle.nn.MaxPool2D(2, 2),
        )

        if num_classes > 0:
            self.fc = Sequential(
                Linear(400, 120), Linear(120, 84), Linear(84, 10)
            )

    def forward(self, inputs):
        x = self.features(inputs)

        if self.num_classes > 0:
            x = paddle.flatten(x, 1, -1)
            x = self.fc(x)
        return x


class ModelInner(paddle.nn.Layer):
    def __init__(self):
        super().__init__()
        self.fc = paddle.nn.Linear(3, 4)

    def forward(self, x):
        y = self.fc(x)
        return y, 0


class ModelOutter(paddle.nn.Layer):
    def __init__(self):
        super().__init__()
        self.module1 = ModelInner()
        self.module2 = paddle.nn.Linear(4, 5)

    def forward(self, x):
        y, dummpy = self.module1(x)
        y = self.module2(y)
        return y, 3


class LeNetListInput(paddle.nn.Layer):
    def __init__(self, num_classes=10):
        super().__init__()
        self.num_classes = num_classes
        self.cov = Conv2D(1, 6, 3, stride=1, padding=1)
        for param in self.cov.parameters():
            param.trainable = False
        self.features = Sequential(
            self.cov,
            ReLU(),
            paddle.nn.MaxPool2D(2, 2),
            Conv2D(6, 16, 5, stride=1, padding=0),
            ReLU(),
            paddle.nn.MaxPool2D(2, 2),
        )

        if num_classes > 0:
            self.fc = Sequential(
                Linear(400, 120), Linear(120, 84), Linear(84, 10)
            )

    def forward(self, inputs):
        x = inputs[0]
        x = self.features(x)

        if self.num_classes > 0:
            x = paddle.flatten(x, 1)
            x = self.fc(x + inputs[1])
        return x


class LeNetDictInput(LeNetDygraph):
    def forward(self, inputs):
        x = self.features(inputs['x1'])

        if self.num_classes > 0:
            x = paddle.flatten(x, 1)
            x = self.fc(x + inputs['x2'])
        return x


class MnistDataset(MNIST):
    def __init__(self, mode, return_label=True, sample_num=None):
        super().__init__(mode=mode)
        self.return_label = return_label
        if sample_num:
            self.images = self.images[:sample_num]
            self.labels = self.labels[:sample_num]

    def __getitem__(self, idx):
        img, label = self.images[idx], self.labels[idx]
        img = np.reshape(img, [1, 28, 28])
        if self.return_label:
            return img, np.array(self.labels[idx]).astype('int64')
        return (img,)

    def __len__(self):
        return len(self.images)


def compute_acc(pred, label):
    pred = np.argmax(pred, -1)
    label = np.array(label)
    correct = pred[:, np.newaxis] == label
    return np.sum(correct) / correct.shape[0]


def dynamic_train(model, dataloader):
    optim = paddle.optimizer.Adam(
        learning_rate=0.001, parameters=model.parameters()
    )
    model.train()
    for inputs, labels in dataloader:
        outputs = model(inputs)
        loss = CrossEntropyLoss(reduction="sum")(outputs, labels)
        avg_loss = paddle.sum(loss)
        avg_loss.backward()
        optim.minimize(avg_loss)
        model.clear_gradients()


def dynamic_evaluate(model, dataloader):
    with fluid.dygraph.no_grad():
        model.eval()
        cnt = 0
        for inputs, labels in dataloader:
            outputs = model(inputs)

            cnt += (
                (
                    np.argmax(outputs.numpy(), -1)[:, np.newaxis]
                    == labels.numpy()
                )
                .astype('int')
                .sum()
            )

    return cnt / len(dataloader.dataset)


@unittest.skipIf(
    not fluid.is_compiled_with_cuda(), 'CPU testing is not supported'
)
class TestModel(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        if not fluid.is_compiled_with_cuda():
            cls().skipTest('module not tested when ONLY_CPU compling')
        cls.device = paddle.set_device('gpu')
        fluid.enable_dygraph(cls.device)

        sp_num = 1280
        cls.train_dataset = MnistDataset(mode='train', sample_num=sp_num)
        cls.val_dataset = MnistDataset(mode='test', sample_num=sp_num)
        cls.test_dataset = MnistDataset(
            mode='test', return_label=False, sample_num=sp_num
        )

        cls.train_loader = paddle.io.DataLoader(
            cls.train_dataset, places=cls.device, batch_size=64
        )
        cls.val_loader = paddle.io.DataLoader(
            cls.val_dataset, places=cls.device, batch_size=64
        )
        cls.test_loader = paddle.io.DataLoader(
            cls.test_dataset, places=cls.device, batch_size=64
        )

        seed = 555
        paddle.seed(seed)
        paddle.framework.random._manual_program_seed(seed)

        dy_lenet = LeNetDygraph()
        cls.init_param = dy_lenet.state_dict()
        dynamic_train(dy_lenet, cls.train_loader)

        cls.acc1 = dynamic_evaluate(dy_lenet, cls.val_loader)

        cls.inputs = [InputSpec([-1, 1, 28, 28], 'float32', 'image')]
        cls.labels = [InputSpec([None, 1], 'int64', 'label')]

        cls.save_dir = os.path.join(tempfile.mkdtemp(), '.cache_test_model')
        if not os.path.exists(cls.save_dir):
            os.makedirs(cls.save_dir)
        cls.weight_path = os.path.join(cls.save_dir, 'lenet')
        paddle.save(dy_lenet.state_dict(), cls.weight_path + '.pdparams')

        fluid.disable_dygraph()

    @classmethod
    def tearDownClass(cls):
        shutil.rmtree(cls.save_dir)

    def test_fit_dygraph(self):
        self.fit(True)

    def test_fit_static(self):
        self.fit(False)

    def test_fit_dynamic_with_tuple_input(self):
        self.fit_with_tuple_input(True)

    def test_fit_static_with_tuple_input(self):
        self.fit_with_tuple_input(False)

    def test_fit_dynamic_with_rank(self):
        self.fit(True, 2, 0)

    def test_fit_static_with_rank(self):
        self.fit(False, 2, 0)

    def test_fit_dynamic_with_num_iters(self):
        self.fit(True, num_iters=1)

    def test_fit_static_with_num_iters(self):
        self.fit(False, num_iters=1)

    def test_evaluate_dygraph(self):
        self.evaluate(True)

    def test_evaluate_static(self):
        self.evaluate(False)

    def test_predict_dygraph(self):
        self.predict(True)

    def test_predict_static(self):
        self.predict(False)

    def test_prepare_context(self):
        prepare_distributed_context()

    def fit(self, dynamic, num_replicas=None, rank=None, num_iters=None):
        fluid.enable_dygraph(self.device) if dynamic else None
        seed = 555
        paddle.seed(seed)
        paddle.framework.random._manual_program_seed(seed)

        net = LeNet()
        optim_new = paddle.optimizer.Adam(
            learning_rate=0.001, parameters=net.parameters()
        )
        model = Model(net, inputs=self.inputs, labels=self.labels)
        model.prepare(
            optim_new,
            loss=CrossEntropyLoss(reduction="sum"),
            metrics=Accuracy(),
        )
        model.fit(self.train_dataset, batch_size=64, shuffle=False)

        result = model.evaluate(self.val_dataset, batch_size=64)
        np.testing.assert_allclose(result['acc'], self.acc1)

        model.fit(
            self.train_dataset,
            batch_size=64,
            shuffle=False,
            num_iters=num_iters,
        )

        result = model.evaluate(
            self.val_dataset, batch_size=64, num_iters=num_iters
        )

        model.fit(self.train_dataset, batch_size=(64, 64), shuffle=False)

        train_sampler = DistributedBatchSampler(
            self.train_dataset,
            batch_size=64,
            shuffle=False,
            num_replicas=num_replicas,
            rank=rank,
        )
        val_sampler = DistributedBatchSampler(
            self.val_dataset,
            batch_size=64,
            shuffle=False,
            num_replicas=num_replicas,
            rank=rank,
        )

        train_loader = paddle.io.DataLoader(
            self.train_dataset,
            batch_sampler=train_sampler,
            places=self.device,
            return_list=True,
        )

        val_loader = paddle.io.DataLoader(
            self.val_dataset,
            batch_sampler=val_sampler,
            places=self.device,
            return_list=True,
        )

        model.fit(train_loader, val_loader)
        fluid.disable_dygraph() if dynamic else None

    def fit_with_tuple_input(self, dynamic, num_replicas=None, rank=None):
        fluid.enable_dygraph(self.device) if dynamic else None
        seed = 555
        paddle.seed(seed)
        paddle.framework.random._manual_program_seed(seed)

        net = LeNet()
        optim_new = paddle.optimizer.Adam(
            learning_rate=0.001, parameters=net.parameters()
        )
        model = Model(net, inputs=tuple(self.inputs), labels=tuple(self.labels))
        model.prepare(
            optim_new,
            loss=CrossEntropyLoss(reduction="sum"),
            metrics=Accuracy(),
        )
        model.fit(self.train_dataset, batch_size=64, shuffle=False)

        result = model.evaluate(self.val_dataset, batch_size=64)
        np.testing.assert_allclose(result['acc'], self.acc1)

        train_sampler = DistributedBatchSampler(
            self.train_dataset,
            batch_size=64,
            shuffle=False,
            num_replicas=num_replicas,
            rank=rank,
        )
        val_sampler = DistributedBatchSampler(
            self.val_dataset,
            batch_size=64,
            shuffle=False,
            num_replicas=num_replicas,
            rank=rank,
        )

        train_loader = paddle.io.DataLoader(
            self.train_dataset,
            batch_sampler=train_sampler,
            places=self.device,
            return_list=True,
        )

        val_loader = paddle.io.DataLoader(
            self.val_dataset,
            batch_sampler=val_sampler,
            places=self.device,
            return_list=True,
        )

        model.fit(train_loader, val_loader)
        fluid.disable_dygraph() if dynamic else None

    def evaluate(self, dynamic):
        fluid.enable_dygraph(self.device) if dynamic else None
        model = Model(LeNet(), self.inputs, self.labels)
        model.prepare(metrics=Accuracy())
        model.load(self.weight_path)
        result = model.evaluate(self.val_dataset, batch_size=64)
        np.testing.assert_allclose(result['acc'], self.acc1)

        sampler = DistributedBatchSampler(
            self.val_dataset, batch_size=64, shuffle=False
        )

        val_loader = paddle.io.DataLoader(
            self.val_dataset,
            batch_sampler=sampler,
            places=self.device,
            return_list=True,
        )

        model.evaluate(val_loader)

        fluid.disable_dygraph() if dynamic else None

    def predict(self, dynamic):
        fluid.enable_dygraph(self.device) if dynamic else None
        model = Model(LeNet(), self.inputs)
        model.prepare()
        model.load(self.weight_path)
        output = model.predict(
            self.test_dataset, batch_size=64, stack_outputs=True
        )
        np.testing.assert_equal(output[0].shape[0], len(self.test_dataset))

        acc = compute_acc(output[0], self.val_dataset.labels)
        np.testing.assert_allclose(acc, self.acc1)

        sampler = DistributedBatchSampler(
            self.test_dataset, batch_size=64, shuffle=False
        )

        test_loader = paddle.io.DataLoader(
            self.test_dataset,
            batch_sampler=sampler,
            places=self.device,
            return_list=True,
        )

        model.evaluate(test_loader)

        fluid.disable_dygraph() if dynamic else None

    def test_predict_without_inputs(self):
        fluid.enable_dygraph(self.device)
        model = Model(LeNet())
        model.prepare()
        model.load(self.weight_path)
        model._inputs = None
        output = model.predict(
            self.test_dataset, batch_size=64, stack_outputs=True
        )
        np.testing.assert_equal(output[0].shape[0], len(self.test_dataset))
        fluid.disable_dygraph()

    def test_summary_gpu(self):
        paddle.disable_static(self.device)
        rnn = paddle.nn.LSTM(16, 32, 2)
        params_info = paddle.summary(
            rnn, [(-1, 23, 16), ((2, None, 32), (2, -1, 32))]
        )


class MyModel(paddle.nn.Layer):
    def __init__(self):
        super().__init__()
        self._fc = Linear(20, 10)

    def forward(self, x):
        y = self._fc(x)
        return y


class MyDataset(Dataset):
    def __getitem__(self, idx):
        return np.random.random(size=(20,)).astype(
            np.float32
        ), np.random.randint(0, 10, size=(1,)).astype(np.int64)

    def __len__(self):
        return 40


class TestModelFunction(unittest.TestCase):
    def set_seed(self, seed=1024):
        paddle.seed(seed)
        paddle.framework.random._manual_program_seed(seed)

    def test_train_batch(self, dynamic=True):
        dim = 20
        data = np.random.random(size=(4, dim)).astype(np.float32)
        label = np.random.randint(0, 10, size=(4, 1)).astype(np.int64)

        def get_expect():
            fluid.enable_dygraph(fluid.CPUPlace())
            self.set_seed()
            m = MyModel()
            optim = paddle.optimizer.SGD(
                learning_rate=0.001, parameters=m.parameters()
            )
            m.train()
            output = m(to_tensor(data))
            loss = CrossEntropyLoss(reduction='sum')(output, to_tensor(label))
            avg_loss = paddle.sum(loss)
            avg_loss.backward()
            optim.minimize(avg_loss)
            m.clear_gradients()
            fluid.disable_dygraph()
            return avg_loss.numpy()

        ref = get_expect()
        for dynamic in [True, False]:
            device = paddle.set_device('cpu')
            fluid.enable_dygraph(device) if dynamic else None
            self.set_seed()

            net = MyModel()
            optim2 = paddle.optimizer.SGD(
                learning_rate=0.001, parameters=net.parameters()
            )

            inputs = [InputSpec([None, dim], 'float32', 'x')]
            labels = [InputSpec([None, 1], 'int64', 'label')]
            model = Model(net, inputs, labels)
            model.prepare(optim2, loss=CrossEntropyLoss(reduction="sum"))
            (loss,) = model.train_batch([data], [label])
            np.testing.assert_allclose(loss.flatten(), ref.flatten())
            fluid.disable_dygraph() if dynamic else None

    def test_test_batch(self):
        dim = 20
        data = np.random.random(size=(4, dim)).astype(np.float32)

        def get_expect():
            fluid.enable_dygraph(fluid.CPUPlace())
            self.set_seed()
            m = MyModel()
            m.eval()
            output = m(to_tensor(data))
            fluid.disable_dygraph()
            return output.numpy()

        ref = get_expect()
        for dynamic in [True, False]:
            device = paddle.set_device('cpu')
            fluid.enable_dygraph(device) if dynamic else None
            self.set_seed()
            net = MyModel()
            inputs = [InputSpec([None, dim], 'float32', 'x')]
            model = Model(net, inputs)
            model.prepare()
            (out,) = model.predict_batch([data])

            np.testing.assert_allclose(out, ref, rtol=1e-6)
            fluid.disable_dygraph() if dynamic else None

    def test_save_load(self):
        path = os.path.join(tempfile.mkdtemp(), '.cache_test_save_load')
        if not os.path.exists(path):
            os.makedirs(path)
        for dynamic in [True, False]:
            device = paddle.set_device('cpu')
            fluid.enable_dygraph(device) if dynamic else None
            net = MyModel()
            inputs = [InputSpec([None, 20], 'float32', 'x')]
            labels = [InputSpec([None, 1], 'int64', 'label')]
            optim = paddle.optimizer.SGD(
                learning_rate=0.001, parameters=net.parameters()
            )
            model = Model(net, inputs, labels)
            model.prepare(
                optimizer=optim, loss=CrossEntropyLoss(reduction="sum")
            )
            model.save(path)
            model.load(path)
            fluid.disable_dygraph() if dynamic else None
        shutil.rmtree(path)

    def test_dynamic_load(self):
        mnist_data = MnistDataset(mode='train')

        path = os.path.join(tempfile.mkdtemp(), '.cache_dynamic_load')
        if not os.path.exists(path):
            os.makedirs(path)

        for new_optimizer in [True, False]:
            paddle.disable_static()
            net = LeNet()
            inputs = [InputSpec([None, 1, 28, 28], 'float32', 'x')]
            labels = [InputSpec([None, 1], 'int64', 'label')]
            if new_optimizer:
                optim = paddle.optimizer.Adam(
                    learning_rate=0.001, parameters=net.parameters()
                )
            else:
                optim = paddle.optimizer.Adam(
                    learning_rate=0.001, parameters=net.parameters()
                )
            model = Model(net, inputs, labels)
            model.prepare(
                optimizer=optim, loss=CrossEntropyLoss(reduction="sum")
            )
            model.fit(mnist_data, batch_size=64, verbose=0)
            model.save(path)
            model.load(path)
            paddle.enable_static()
        shutil.rmtree(path)

    def test_dynamic_save_static_load(self):
        path = os.path.join(
            tempfile.mkdtemp(), '.cache_dynamic_save_static_load'
        )
        if not os.path.exists(path):
            os.makedirs(path)
        # dynamic saving
        device = paddle.set_device('cpu')
        fluid.enable_dygraph(device)
        model = Model(MyModel())
        optim = paddle.optimizer.SGD(
            learning_rate=0.001, parameters=model.parameters()
        )
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))
        model.save(path)
        fluid.disable_dygraph()

        inputs = [InputSpec([None, 20], 'float32', 'x')]
        labels = [InputSpec([None, 1], 'int64', 'label')]
        model = Model(MyModel(), inputs, labels)
        optim = paddle.optimizer.SGD(
            learning_rate=0.001, parameters=model.parameters()
        )
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))
        model.load(path)
        shutil.rmtree(path)

    def test_static_save_dynamic_load(self):
        path = os.path.join(
            tempfile.mkdtemp(), '.cache_test_static_save_dynamic_load'
        )
        if not os.path.exists(path):
            os.makedirs(path)
        net = MyModel()
        inputs = [InputSpec([None, 20], 'float32', 'x')]
        labels = [InputSpec([None, 1], 'int64', 'label')]
        optim = paddle.optimizer.SGD(
            learning_rate=0.001, parameters=net.parameters()
        )
        model = Model(net, inputs, labels)
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))
        model.save(path)

        device = paddle.set_device('cpu')
        fluid.enable_dygraph(device)  # if dynamic else None

        net = MyModel()
        inputs = [InputSpec([None, 20], 'float32', 'x')]
        labels = [InputSpec([None, 1], 'int64', 'label')]
        optim = paddle.optimizer.SGD(
            learning_rate=0.001, parameters=net.parameters()
        )
        model = Model(net, inputs, labels)
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))
        model.load(path)
        shutil.rmtree(path)
        fluid.disable_dygraph()

    def test_parameters(self):
        for dynamic in [True, False]:
            device = paddle.set_device('cpu')
            fluid.enable_dygraph(device) if dynamic else None
            net = MyModel()
            inputs = [InputSpec([None, 20], 'float32', 'x')]
            model = Model(net, inputs)
            model.prepare()
            params = model.parameters()
            self.assertTrue(params[0].shape[0] == 20)
            self.assertTrue(params[0].shape[1] == 10)
            fluid.disable_dygraph() if dynamic else None

    def test_summary(self):
        def _get_param_from_state_dict(state_dict):
            params = 0
            for k, v in state_dict.items():
                params += np.prod(v.numpy().shape)
            return params

        for dynamic in [True, False]:
            device = paddle.set_device('cpu')
            fluid.enable_dygraph(device) if dynamic else None
            net = MyModel()
            inputs = [InputSpec([None, 20], 'float32', 'x')]
            model = Model(net, inputs)
            model.prepare()
            params_info = model.summary()
            gt_params = _get_param_from_state_dict(net.state_dict())

            np.testing.assert_allclose(params_info['total_params'], gt_params)
            print(params_info)

            model.summary(input_size=(20))
            model.summary(input_size=[(20)])
            model.summary(input_size=(20), dtype='float32')

    def test_summary_non_tensor(self):
        paddle.summary(ModelOutter(), input_size=(-1, 3))

    def test_summary_nlp(self):
        def _get_param_from_state_dict(state_dict):
            params = 0
            for k, v in state_dict.items():
                params += np.prod(v.numpy().shape)
            return params

        nlp_net = paddle.nn.GRU(
            input_size=2, hidden_size=3, num_layers=3, direction="bidirectional"
        )
        paddle.summary(nlp_net, (1, 1, 2))

        rnn = paddle.nn.LSTM(16, 32, 2)
        params_info = paddle.summary(
            rnn, [(-1, 23, 16), ((2, None, 32), (2, -1, 32))]
        )
        gt_params = _get_param_from_state_dict(rnn.state_dict())
        np.testing.assert_allclose(params_info['total_params'], gt_params / 2.0)

        rnn = paddle.nn.GRU(16, 32, 2, direction='bidirectional')
        params_info = paddle.summary(rnn, (4, 23, 16))
        gt_params = _get_param_from_state_dict(rnn.state_dict())
        np.testing.assert_allclose(params_info['total_params'], gt_params / 2.0)

        rnn = paddle.nn.SimpleRNN(16, 32, 2, direction='bidirectional')
        params_info = paddle.summary(rnn, (4, 23, 16))
        gt_params = _get_param_from_state_dict(rnn.state_dict())
        np.testing.assert_allclose(params_info['total_params'], gt_params / 2.0)

    def test_summary_input(self):
        paddle.enable_static()
        mymodel = MyModel()
        input_data = paddle.rand([1, 20])
        paddle.summary(mymodel, input=input_data)
        paddle.disable_static()

        rnn = paddle.nn.SimpleRNN(16, 32, 2, direction='bidirectional')
        input_data = paddle.rand([4, 23, 16])
        paddle.summary(rnn, input=input_data)

        lenet_List_input = LeNetListInput()
        input_data = [paddle.rand([1, 1, 28, 28]), paddle.rand([1, 400])]
        paddle.summary(lenet_List_input, input=input_data)

        lenet_dict_input = LeNetDictInput()
        input_data = {
            'x1': paddle.rand([1, 1, 28, 28]),
            'x2': paddle.rand([1, 400]),
        }
        paddle.summary(lenet_dict_input, input=input_data)

    def test_summary_dtype(self):
        input_shape = (3, 1)
        net = paddle.nn.Embedding(10, 3, sparse=True)
        paddle.summary(net, input_shape, dtypes='int64')

    def test_summary_error(self):
        with self.assertRaises(TypeError):
            nlp_net = paddle.nn.GRU(input_size=2, hidden_size=3, num_layers=3)
            paddle.summary(nlp_net, (1, 1, '2'))

        with self.assertRaises(ValueError):
            nlp_net = paddle.nn.GRU(input_size=2, hidden_size=3, num_layers=3)
            paddle.summary(nlp_net, (-1, -1))

        paddle.disable_static()
        nlp_net = paddle.nn.GRU(input_size=2, hidden_size=3, num_layers=3)
        paddle.summary(nlp_net, (1, 1, 2))

    def test_static_flops(self):
        if True:
            return
        paddle.disable_static()
        net = models.__dict__['mobilenet_v2'](pretrained=False)
        inputs = paddle.randn([1, 3, 224, 224])
        static_program = jit._trace(net, inputs=[inputs])[1]
        paddle.flops(static_program, [1, 3, 224, 224], print_detail=True)

    def test_dynamic_flops(self):
        net = models.__dict__['mobilenet_v2'](pretrained=False)

        def customize_dropout(m, x, y):
            m.total_ops += 0

        paddle.flops(
            net,
            [1, 3, 224, 224],
            custom_ops={paddle.nn.Dropout: customize_dropout},
            print_detail=True,
        )

    def test_dynamic_flops_with_multiple_outputs(self):
        net = paddle.nn.MaxPool2D(
            kernel_size=2, stride=2, padding=0, return_mask=True
        )

        def customize_dropout(m, x, y):
            m.total_ops += 0

        paddle.flops(
            net,
            [1, 2, 32, 32],
            custom_ops={paddle.nn.Dropout: customize_dropout},
            print_detail=True,
        )

    def test_export_deploy_model(self):
        self.set_seed()
        np.random.seed(201)

        save_dir = os.path.join(
            tempfile.mkdtemp(), '.cache_test_export_deploy_model'
        )
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        for dynamic in [True, False]:
            paddle.disable_static() if dynamic else None
            paddle.jit.enable_to_static(False) if not dynamic else None

            net = LeNet()
            inputs = [InputSpec([None, 1, 28, 28], 'float32', 'x')]
            model = Model(net, inputs)
            model.prepare()

            tensor_img = np.array(
                np.random.random((1, 1, 28, 28)), dtype=np.float32
            )

            model.save(save_dir, training=False)
            ori_results = model.predict_batch(tensor_img)
            fluid.disable_dygraph() if dynamic else None

            place = (
                fluid.CPUPlace()
                if not fluid.is_compiled_with_cuda()
                else fluid.CUDAPlace(0)
            )
            new_scope = fluid.Scope()
            with fluid.scope_guard(new_scope):
                exe = fluid.Executor(place)
                [
                    inference_program,
                    feed_target_names,
                    fetch_targets,
                ] = paddle.static.io.load_inference_model(
                    path_prefix=save_dir, executor=exe
                )
                results = exe.run(
                    inference_program,
                    feed={feed_target_names[0]: tensor_img},
                    fetch_list=fetch_targets,
                )
                np.testing.assert_allclose(
                    results, ori_results, rtol=1e-5, atol=1e-6
                )

            paddle.enable_static()

        shutil.rmtree(save_dir)

    def test_dygraph_export_deploy_model_about_inputs(self):
        self.set_seed()
        np.random.seed(201)
        mnist_data = MnistDataset(mode='train')
        paddle.disable_static()
        # without inputs
        save_dir = os.path.join(
            tempfile.mkdtemp(), '.cache_test_dygraph_export_deploy'
        )
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        for initial in ["fit", "train_batch", "eval_batch", "predict_batch"]:
            net = LeNet()
            model = Model(net)
            optim = paddle.optimizer.Adam(
                learning_rate=0.001, parameters=model.parameters()
            )
            model.prepare(
                optimizer=optim, loss=CrossEntropyLoss(reduction="sum")
            )
            if initial == "fit":
                model.fit(mnist_data, batch_size=64, verbose=0)
            else:
                img = np.array(
                    np.random.random((1, 1, 28, 28)), dtype=np.float32
                )
                label = np.array(np.random.rand(1, 1), dtype=np.int64)
                if initial == "train_batch":
                    model.train_batch([img], [label])
                elif initial == "eval_batch":
                    model.eval_batch([img], [label])
                else:
                    model.predict_batch([img])

            model.save(save_dir, training=False)
        shutil.rmtree(save_dir)
        # with inputs, and the type of inputs is InputSpec
        save_dir = os.path.join(
            tempfile.mkdtemp(), '.cache_test_dygraph_export_deploy_2'
        )
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        net = LeNet()
        inputs = InputSpec([None, 1, 28, 28], 'float32', 'x')
        model = Model(net, inputs)
        optim = paddle.optimizer.Adam(
            learning_rate=0.001, parameters=model.parameters()
        )
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))
        model.save(save_dir, training=False)
        shutil.rmtree(save_dir)

    def test_accumulate(
        self,
    ):
        dim = 20
        data = np.random.random(size=(4, dim)).astype(np.float32)
        label = np.random.randint(0, 10, size=(4, 1)).astype(np.int64)
        net = MyModel()
        optim = paddle.optimizer.SGD(
            learning_rate=0.001, parameters=net.parameters()
        )
        inputs = [InputSpec([None, dim], 'float32', 'x')]
        labels = [InputSpec([None, 1], 'int64', 'label')]

        for amp_cfg in [None, 'O1']:
            model = Model(net, inputs, labels)
            model.prepare(
                optim,
                loss=CrossEntropyLoss(reduction="sum"),
                amp_configs=amp_cfg,
            )
            losses, grads = [], []
            for stat in [False, False, True]:
                (loss,) = model.train_batch([data], [label], update=stat)
                losses.append(loss)
                grads.append([p.grad.numpy() for p in net.parameters()])

            for grad1, grad2, grad3 in zip(*grads):
                np.testing.assert_almost_equal(grad1 * 2, grad2, decimal=4)
                np.testing.assert_almost_equal(
                    grad3, np.zeros_like(grad3), decimal=4
                )

            np.testing.assert_almost_equal(losses[0], losses[1], decimal=4)
            np.testing.assert_almost_equal(losses[0], losses[2], decimal=4)


class TestModelWithLRScheduler(unittest.TestCase):
    def test_fit_by_step(self):
        base_lr = 1e-3
        boundaries = [5, 8]

        def make_optimizer(parameters=None):
            momentum = 0.9
            weight_decay = 5e-4
            values = [base_lr * (0.1**i) for i in range(len(boundaries) + 1)]
            learning_rate = paddle.optimizer.lr.PiecewiseDecay(
                boundaries=boundaries, values=values
            )
            learning_rate = paddle.optimizer.lr.LinearWarmup(
                learning_rate=learning_rate,
                warmup_steps=4,
                start_lr=base_lr / 5.0,
                end_lr=base_lr,
                verbose=True,
            )
            optimizer = paddle.optimizer.Momentum(
                learning_rate=learning_rate,
                weight_decay=weight_decay,
                momentum=momentum,
                parameters=parameters,
            )
            return optimizer

        # dynamic test
        device = paddle.set_device('cpu')
        fluid.enable_dygraph(device)
        net = MyModel()
        inputs = [InputSpec([None, 20], 'float32', 'x')]
        labels = [InputSpec([None, 1], 'int64', 'label')]
        optim = make_optimizer(net.parameters())
        model = Model(net, inputs, labels)
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))

        dataset = MyDataset()
        model.fit(dataset, dataset, batch_size=4, epochs=10, num_workers=0)

        np.testing.assert_allclose(
            model._optimizer._learning_rate.last_lr,
            base_lr * (0.1 ** len(boundaries)),
        )
        # static test
        paddle.enable_static()

        net = MyModel()
        inputs = [InputSpec([None, 20], 'float32', 'x')]
        labels = [InputSpec([None, 1], 'int64', 'label')]
        optim = make_optimizer(net.parameters())
        model = Model(net, inputs, labels)
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))

        dataset = MyDataset()
        model.fit(dataset, dataset, batch_size=4, epochs=10, num_workers=0)

        np.testing.assert_allclose(
            model._optimizer._learning_rate.last_lr,
            base_lr * (0.1 ** len(boundaries)),
        )

    def test_fit_by_epoch(self):
        base_lr = 1e-3
        boundaries = [5, 8]
        epochs = 10
        wamup_epochs = 4

        def make_optimizer(parameters=None):
            momentum = 0.9
            weight_decay = 5e-4
            values = [base_lr * (0.1**i) for i in range(len(boundaries) + 1)]
            learning_rate = paddle.optimizer.lr.PiecewiseDecay(
                boundaries=boundaries, values=values
            )
            learning_rate = paddle.optimizer.lr.LinearWarmup(
                learning_rate=learning_rate,
                warmup_steps=wamup_epochs,
                start_lr=base_lr / 5.0,
                end_lr=base_lr,
                verbose=True,
            )
            optimizer = paddle.optimizer.Momentum(
                learning_rate=learning_rate,
                weight_decay=weight_decay,
                momentum=momentum,
                parameters=parameters,
            )
            return optimizer

        # dynamic test
        device = paddle.set_device('cpu')
        fluid.enable_dygraph(device)
        net = MyModel()
        inputs = [InputSpec([None, 20], 'float32', 'x')]
        labels = [InputSpec([None, 1], 'int64', 'label')]
        optim = make_optimizer(net.parameters())
        model = Model(net, inputs, labels)
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))

        dataset = MyDataset()

        lr_scheduler_callback = paddle.callbacks.LRScheduler(
            by_step=False, by_epoch=True
        )

        model.fit(
            dataset,
            dataset,
            batch_size=4,
            epochs=epochs,
            num_workers=0,
            callbacks=lr_scheduler_callback,
        )

        cnt = 0
        for b in boundaries:
            if b + wamup_epochs <= epochs:
                cnt += 1

        np.testing.assert_allclose(
            model._optimizer._learning_rate.last_lr, base_lr * (0.1**cnt)
        )
        # static test
        paddle.enable_static()

        net = MyModel()
        inputs = [InputSpec([None, 20], 'float32', 'x')]
        labels = [InputSpec([None, 1], 'int64', 'label')]
        optim = make_optimizer(net.parameters())
        model = Model(net, inputs, labels)
        model.prepare(optimizer=optim, loss=CrossEntropyLoss(reduction="sum"))

        dataset = MyDataset()

        lr_scheduler_callback = paddle.callbacks.LRScheduler(
            by_step=False, by_epoch=True
        )

        model.fit(
            dataset,
            dataset,
            batch_size=4,
            epochs=epochs,
            num_workers=0,
            callbacks=lr_scheduler_callback,
        )

        cnt = 0
        for b in boundaries:
            if b + wamup_epochs <= epochs:
                cnt += 1

        np.testing.assert_allclose(
            model._optimizer._learning_rate.last_lr, base_lr * (0.1**cnt)
        )


class TestRaiseError(unittest.TestCase):
    def test_input_without_name(self):
        net = MyModel()
        inputs = [InputSpec([None, 10], 'float32')]
        labels = [InputSpec([None, 1], 'int64', 'label')]
        with self.assertRaises(ValueError):
            model = Model(net, inputs, labels)

    def test_static_without_inputs(self):
        paddle.enable_static()
        net = MyModel()
        with self.assertRaises(TypeError):
            model = Model(net)

    def test_save_infer_model_without_inputs_and_run_in_dygraph(self):
        paddle.disable_static()
        net = MyModel()
        save_dir = os.path.join(tempfile.mkdtemp(), '.cache_test_save_infer')
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        with self.assertRaises(RuntimeError):
            model = Model(net)
            model.save(save_dir, training=False)
        paddle.enable_static()
        shutil.rmtree(save_dir)

    def test_save_infer_model_without_file_prefix(self):
        paddle.enable_static()
        net = LeNet()
        inputs = [InputSpec([None, 1, 28, 28], 'float32', 'x')]
        model = Model(net, inputs)
        model.prepare()
        path = ""
        tensor_img = np.array(
            np.random.random((1, 1, 28, 28)), dtype=np.float32
        )
        with self.assertRaises(ValueError):
            model.save(path, training=False)


if __name__ == '__main__':
    unittest.main()
